Spark Functions

Spark SQL provides pivot() function to rotate the data from one column into multiple columns (transpose row to column). It is an aggregation where one of the grouping columns values transposed into individual columns with distinct data. From the above DataFrame, to get the total amount exported to each country of each product will do group by Product, pivot by Country, and the sum of Amount.

val empDF = spark.createDataFrame(Seq(

      (7369, "SMITH", "CLERK", 7902, "17-Dec-80", 800, 20, 10),

      (7499, "ALLEN", "SALESMAN", 7698, "20-Feb-81", 1600, 300, 30),

      (7521, "WARD", "SALESMAN", 7698, "22-Feb-81", 1250, 500, 30),

      (7566, "JONES", "MANAGER", 7839, "2-Apr-81", 2975, 0, 20),

      (7654, "MARTIN", "SALESMAN", 7698, "28-Sep-81", 1250, 1400, 30),

      (7698, "BLAKE", "MANAGER", 7839, "1-May-81", 2850, 0, 30),

      (7782, "CLARK", "MANAGER", 7839, "9-Jun-81", 2450, 0, 10),

      (7788, "SCOTT", "ANALYST", 7566, "19-Apr-87", 3000, 0, 20),

      (7839, "KING", "PRESIDENT", 0, "17-Nov-81", 5000, 0, 10),

      (7844, "TURNER", "SALESMAN", 7698, "8-Sep-81", 1500, 0, 30),

      (7876, "ADAMS", "CLERK", 7788, "23-May-87", 1100, 0, 20)

    )).toDF("empno", "ename", "job", "mgr", "hiredate", "sal", "comm", "deptno")

empDF.groupBy("job").pivot("deptno").sum("sal").show()



from pyspark.sql import functions as F

d = [(100,1,23,10),(100,2,45,11),(100,3,67,12),(100,4,78,13),(101,1,23,10),(101,2,45,13),(101,3,67,14),(101,4,78,15),(102,1,23,10),(102,2,45,11),(102,3,67,16),(102,4,78,18)]

mydf = spark.createDataFrame(d,['id','day','price','units'])

mydf.show()



pvtdf = mydf.withColumn('combcol', F.concat(F.lit('price_'),mydf['day'])).groupby('id'). pivot('combcol').agg(F.first('price'))

pvtdf.show()




This article shows how to 'delete' rows/data from Spark data frame using Python. I added double quotes to word "Delete" because we are not really deleting the data. Because of Spark's lazy evaluation mechanism for transformations,  it is very different from creating a data frame in memory with data and then physically deleting some rows from it. 

from pyspark.sql import SparkSession

appName = "Python Example - 'Delete' Data from DataFrame"

master = "local"

# Create Spark session

spark = SparkSession.builder \

    .appName(appName) \

    .master(master) \

    .getOrCreate()


# List

data = [{"Category": 'Category A', "ID": 1, "Value": 12.40},

        {"Category": 'Category B', "ID": 2, "Value": 30.10},

        {"Category": 'Category C', "ID": 3, "Value": 100.01}

        ]

# Create data frame

df = spark.createDataFrame(data)

print(df.schema)


# Delete/Remove data from the dataframe 

df2 = df.where("Category <> 'Category B'")

df2.show()

PySpark DataFrame APIs provide two drop related methods: drop and dropDuplicates (or drop_duplicates). The former is used to drop specified column(s) from a DataFrame while the latter is used to drop duplicated rows. 

from pyspark.sql import SparkSession

appName = "PySpark drop and dropDuplicates"

master = "local"


spark = SparkSession.builder \

    .appName(appName) \

    .master(master) \

    .getOrCreate()


spark.sparkContext.setLogLevel("ERROR")


# Create a dataframe

df = spark.sql("""SELECT ACCT, TXN_DT, AMT FROM VALUES 

(101,10.01, DATE'2021-01-01'),

(101,10.01, DATE'2021-01-01'),

(101,102.01, DATE'2021-01-01')

AS TXN(ACCT,AMT,TXN_DT)""")


print(df.schema)


# Use drop function

df.drop('TXN_DT').show(truncate=False)

import org.apache.spark.sql.functions._

import spark.implicits._

  val data = spark.sparkContext.parallelize(Seq(

    (1, "A", List(1,2,3)),

    (2, "B", List(3, 5))

  )).toDF("FieldA", "FieldB", "FieldC")

data.show 


 data.withColumn("ExplodedField", explode($"FieldC")).drop("FieldC").show


Dataframe concat string

df_pres.select(concat($"pres_id",$"pres_name")).show()
df_pres.select(concat_ws("-",$"pres_id",$"pres_name")).show()

No comments:

Post a Comment